import pandas, numpy, dask, bokeh, numba, scipy, prefect, intake
import dask.dataframe as dd
from dask.distributed import Client
import hvplot.pandas
import hvplot.dask
import os, struct, time, tqdm.notebook, dask.diagnostics
server,port = "192.168.1.90","8786"
client = Client()#(n_workers=1,processes=True,threads_per_worker=4, memory_limit=0.5)
time.sleep(10)
17:40:05.057 | INFO | distributed.scheduler - State start
17:40:05.064 | INFO | distributed.scheduler - Scheduler at: tcp://127.0.0.1:35957
17:40:05.066 | INFO | distributed.scheduler - dashboard at: 127.0.0.1:8787
17:40:05.082 | INFO | distributed.nanny - Start Nanny at: 'tcp://127.0.0.1:41627'
17:40:05.092 | INFO | distributed.nanny - Start Nanny at: 'tcp://127.0.0.1:43415'
17:40:05.097 | INFO | distributed.nanny - Start Nanny at: 'tcp://127.0.0.1:38845'
17:40:05.103 | INFO | distributed.nanny - Start Nanny at: 'tcp://127.0.0.1:34063'
17:40:05.724 | INFO | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:46483', name: 1, status: init, memory: 0, processing: 0>
17:40:05.729 | INFO | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:46483
17:40:05.731 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:48552
17:40:05.757 | INFO | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:35691', name: 3, status: init, memory: 0, processing: 0>
17:40:05.759 | INFO | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:35691
17:40:05.762 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:48554
17:40:05.765 | INFO | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:40169', name: 2, status: init, memory: 0, processing: 0>
17:40:05.768 | INFO | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:40169
17:40:05.770 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:48564
17:40:05.774 | INFO | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:33695', name: 0, status: init, memory: 0, processing: 0>
17:40:05.776 | INFO | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:33695
17:40:05.778 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:48550
17:40:05.790 | INFO | distributed.scheduler - Receive client connection: Client-69ee05bc-ffc9-11ed-949e-b4b5b6a4f425
17:40:05.791 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:48572
@numba.njit()
def detector_from_pixel(pixel_id):
if pixel_id >= 131072 * 2:
return (pixel_id % (131072 * 2) + 2048) // 128
elif pixel_id >= 131072:
return (pixel_id % 131072 + 1024) // 128
else:
return pixel_id // 128
deconvolution_dict = {
0:{
"iTED": 1,
"Crystal": 0,
"Offset": 0
},
1:{
"iTED": 1,
"Crystal": 1,
"Offset": 128
},
2:{
"iTED": 1,
"Crystal": 2,
"Offset": 256
},
3:{
"iTED": 1,
"Crystal": 3,
"Offset": 384
},
4:{
"iTED": 1,
"Crystal": 4,
"Offset": 512
},
5:{
"iTED": 2,
"Crystal": 0,
"Offset": 640
},
6:{
"iTED": 2,
"Crystal": 1,
"Offset": 768
},
7:{
"iTED": 2,
"Crystal": 2,
"Offset": 896
},
8:{
"iTED": 3,
"Crystal": 0,
"Offset": 131072
},
10:{
"iTED": 3,
"Crystal": 1,
"Offset": 131328
},
11:{
"iTED": 3,
"Crystal": 2,
"Offset": 131456
},
12:{
"iTED": 3,
"Crystal": 3,
"Offset": 131584
},
13:{
"iTED": 3,
"Crystal": 4,
"Offset": 131712
},
14:{
"iTED": 2,
"Crystal": 3,
"Offset": 131840
},
15:{
"iTED": 2,
"Crystal": 4,
"Offset": 131968
},
16:{
"iTED": 4,
"Crystal": 0,
"Offset": 262144
},
18:{
"iTED": 4,
"Crystal": 1,
"Offset": 262400
},
19:{
"iTED": 4,
"Crystal": 2,
"Offset": 262528
},
20:{
"iTED": 4,
"Crystal": 3,
"Offset": 262656
},
21:{
"iTED": 4,
"Crystal": 4,
"Offset": 262784
}
}
deconvolution_df = pandas.DataFrame(deconvolution_dict).T
deconvolute = lambda x: \
(x % (131072 * 2) + 2048) // 128 if x >= 131072 * 2 else (x % 131072 + 1024) // 128 if x >= 131072 else x // 128
pixel_maps = pandas.read_csv(
"../../data/DetectorMaps/DETECTOR_MAPS",
header=None,
delim_whitespace=True, # sep="\s+"
usecols=(1,2),
names=("crystal_id","pixel_map"),
converters={
"pixel_map": lambda s: pandas.read_csv(
f"../../data/DetectorMaps/{s[2:]}",
header=None,
delim_whitespace=True,
)
}
)
#pixel_maps.query('crystal_id == 6').pixel_map.item().stack()
#.to_frame("pixel_id").query("pixel_id == 6").index.item()
th_dict = {
7:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_31/Background_D.2023_05_31_T.17_08_58_C.itedE_2023.05.30_4.0v_887_60s.singles.ldat",
"time": 60,
"dd": None
},
8:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_31/Background_D.2023_05_31_T.17_10_07_C.itedE_2023.05.30_4.0v_888_60s.singles.ldat",
"time": 60,
"dd": None
},
9:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_31/Background_D.2023_05_31_T.17_11_21_C.itedE_2023.05.30_4.0v_889_60s.singles.ldat",
"time": 60,
"dd": None
},
10:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_31/Background_D.2023_05_31_T.17_12_50_C.itedE_2023.05.30_4.0v_8810_60s.singles.ldat",
"time": 60,
"dd": None
},
11:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_31/Background_D.2023_05_31_T.17_14_26_C.itedE_2023.05.30_4.0v_8811_60s.singles.ldat",
"time": 60,
"dd": None
},
12:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_31/Background_D.2023_05_31_T.17_15_47_C.itedE_2023.05.30_4.0v_8812_60s.singles.ldat",
"time": 60,
"dd": None
},
13:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_31/Background_D.2023_05_31_T.17_17_01_C.itedE_2023.05.30_4.0v_8813_60s.singles.ldat",
"time": 60,
"dd": None
},
#0:{
# "path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_21/background_D.2023_05_08_T.15_38_52_C.itedABCD_lab_custom_2023.02.22_4.0v_887_120s.singles.ldat",
# "time": 120,
# "dd": None
#},
}
def get_file(file_name):
dt = numpy.dtype([('time', numpy.int64), ('energy', numpy.float32), ('id', numpy.int32)])
return numpy.fromfile(file_name, dtype=dt)
def get_entry_from_binary3(file_np):
dt = numpy.dtype([('time', numpy.int64), ('energy', numpy.float32), ('id', numpy.int32)])
dd = dask.dataframe.from_array(
file_np,
columns=dt.names
)
return dd
def get_entry_from_binary4(file_np):
dt = numpy.dtype([('time', numpy.int64), ('energy', numpy.float32), ('id', numpy.int32)])
dd = pandas.DataFrame(
file_np,
columns=dt.names
)
return dd
@dask.delayed
def get_entry_from_binary(file_name, chunk_size=8+4+4):
with open(file_name, "rb") as f:
while (entry := f.read(chunk_size)):
yield dict(zip(("time","energy","id"), struct.unpack("qfi", entry)))
#file = "../../data/Multi_iTED_characterization/test_D.2023_03_31_T.16_26_55_C.itedABCD_lab_custom_2023.02.22_4.0v_887_5s.singles.ldat"
with tqdm.notebook.tqdm(total=len(th_dict)) as pbar1, tqdm.notebook.tqdm(total=5) as pbar2:
for i in th_dict:
pbar1.set_description(f"Processing th={i:2}")
pbar2.set_description(f"Entries")
entries_df = get_entry_from_binary4(get_file(th_dict[i]["path"]))
pbar2.update(1)
pbar2.set_description(f"Counts")
th_dict[i]["dd"] = entries_df.id\
.value_counts()\
.to_frame(name="counts")
pbar2.update(1)
pbar2.set_description(f"Normalize")
th_dict[i]["dd"].counts /= th_dict[i]["time"]
pbar2.update(1)
pbar2.set_description(f"ID th")
th_dict[i]["dd"]["th"] = i
pbar2.update(1)
pbar2.set_description(f"ID pixel")
th_dict[i]["dd"]["id"] = th_dict[i]["dd"].index
pbar2.update(1)
pbar2.update(1)
pbar1.update(1)
dd = dask.dataframe.concat([th_dict[i]["dd"] for i in th_dict], ignore_order=True)
dd["crystal_id"] = dd.id.map(deconvolute, meta=("x", "int8"))
dd["iTED"] = dd.crystal_id.map(lambda x: deconvolution_dict[x]["iTED"], meta=("x", "int8"))
dd["crystal"] = dd.crystal_id.map(lambda x: deconvolution_dict[x]["Crystal"], meta=("x", "int8"))
dd["pixel"] = dd.id.map(lambda x: x - deconvolution_dict[deconvolute(x)]["Offset"], meta=("x", "int8"))
dd["coordinates"] = dd.apply(
lambda entry: pixel_maps.query(f'crystal_id == {entry.crystal_id}')
.pixel_map
.item()
.stack()
.to_frame("pixel_id")
.query(f"pixel_id == {entry.pixel}")
.index
.item(),
axis=1,
meta=(None, 'object')
)
dd['x'] = dd.coordinates.map(lambda x: x[1], meta=("x", "int8"))
dd['y'] = dd.coordinates.map(lambda x: x[0], meta=("x", "int8"))
0%| | 0/7 [00:00<?, ?it/s]
0%| | 0/5 [00:00<?, ?it/s]
#import pickle
#with open('dict.pickle', 'wb') as handle:
# pickle.dump(th_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)
#import pickle
#with open('dict.pickle', 'rb') as handle:
# th_dict = pickle.load(handle)
dd.hvplot.box('counts', by=['th', 'crystal_id'])
dd.groupby("th").counts.sum().compute()
th 7 326638.166667 8 221814.750000 9 170570.483333 10 157176.716667 11 152544.783333 12 149004.533333 13 146473.966667 Name: counts, dtype: float64
dd.counts.nlargest(10).compute()
681 95714.566667 681 51946.583333 681 11217.366667 693 10898.883333 679 8760.000000 661 5535.033333 687 5223.416667 702 4598.300000 656 4553.900000 653 3603.666667 Name: counts, dtype: float64
with dask.diagnostics.ProgressBar():
global ddd
ddd = dd.query('th == 7').compute() # Base threshold
ddd = ddd.set_index('id')
threshold_counts = ddd.groupby("crystal_id").counts.median()*5
with tqdm.notebook.tqdm(total=len(th_dict)) as pbar1, tqdm.notebook.tqdm(total=len(threshold_counts)) as pbar2:
for th in th_dict:
pbar2.reset()
pbar1.set_description( f"Threshold: {th:2}")
for crystal in ddd.crystal_id.unique():
pbar2.set_description(f"Crystal: {int(crystal):4}")
ddd.update(
dd.query(f"th == {th} & id in {list(ddd.query('counts > @threshold_counts[@crystal]').index.values)}")\
.compute()\
.set_index('id')
)
pbar2.update(1)
pbar1.update(1)
0%| | 0/7 [00:00<?, ?it/s]
0%| | 0/5 [00:00<?, ?it/s]
ddd["id"] = ddd.index
print("Total sum:")
print(ddd.counts.sum())
print("Most active:")
print(ddd.counts.nlargest(10))
print("Stats:")
ddd.counts.describe(percentiles=[.9,.95])
Total sum: 155749.09999999998 Most active: id 545 1128.866667 565 1118.516667 544 1116.300000 549 1113.566667 534 1111.383333 513 1103.550000 538 1102.883333 521 1101.550000 557 1101.433333 533 1101.183333 Name: counts, dtype: float64 Stats:
count 320.000000 mean 486.715937 std 401.171627 min 48.316667 50% 255.408333 90% 1032.565000 95% 1090.893333 max 1128.866667 Name: counts, dtype: float64
ddd.hvplot.box('counts', by='crystal_id',hover_cols=["id","th"])
# Crystal to see
ddd.hvplot.heatmap(
x='x', y='y', C='counts', groupby='crystal_id', by='crystal_id',
height=500, width=500, colorbar=False,
hover_cols=["id","th"]
)
ddd.th.value_counts()
7.0 165 13.0 128 8.0 19 9.0 7 11.0 1 Name: th, dtype: int64
"""
improved_threshold=ddd[["th","id"]].astype("int32")#.query("th != 7")#.to_csv("thresholds.csv",index=False)
improved_threshold["detector"] = improved_threshold.id.map(detector_from_pixel)
improved_threshold["vth_t1"] = 8
improved_threshold["vth_t2"] = 8
improved_threshold["channelID"] = improved_threshold.id.map(lambda x: x-deconvolution_dict[detector_from_pixel(x)]["Offset"])
improved_threshold["slaveID"] = 0
improved_threshold["#portID"] = improved_threshold.id // 131072
improved_threshold["chipID"] = (improved_threshold.id - improved_threshold["#portID"] * 131072) // 64
improved_threshold.rename(columns={"th": "vth_e"},inplace=True)
"""
#improved_threshold
| vth_e | id | detector | vth_t1 | vth_t2 | channelID | slaveID | #portID | chipID | |
|---|---|---|---|---|---|---|---|---|---|
| id | |||||||||
| 0 | 7 | 0 | 0 | 8 | 8 | 0 | 0 | 0 | 0 |
| 1 | 7 | 1 | 0 | 8 | 8 | 1 | 0 | 0 | 0 |
| 2 | 7 | 2 | 0 | 8 | 8 | 2 | 0 | 0 | 0 |
| 3 | 7 | 3 | 0 | 8 | 8 | 3 | 0 | 0 | 0 |
| 4 | 7 | 4 | 0 | 8 | 8 | 4 | 0 | 0 | 0 |
| ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
| 699 | 8 | 699 | 5 | 8 | 8 | 59 | 0 | 0 | 10 |
| 700 | 7 | 700 | 5 | 8 | 8 | 60 | 0 | 0 | 10 |
| 701 | 7 | 701 | 5 | 8 | 8 | 61 | 0 | 0 | 10 |
| 702 | 9 | 702 | 5 | 8 | 8 | 62 | 0 | 0 | 10 |
| 703 | 7 | 703 | 5 | 8 | 8 | 63 | 0 | 0 | 10 |
320 rows × 9 columns
#improved_threshold.to_csv('disc_settingsE.tsv', sep="\t", columns=["#portID","slaveID","chipID","channelID","vth_t1","vth_t2","vth_e"],index=False)
"""
ddd.update(
dask.dataframe.concat([
# List changes
dd.query('th == 9' ).loc[131640],
dd.query('th == 11').loc[262173],
dd.query('th == 9' ).loc[546],
dd.query('th == 8' ).loc[131615],
dd.query('th == 8' ).loc[131625],
dd.query('th == 8' ).loc[262172],
dd.query('th == 9' ).loc[802],
dd.query('th == 8' ).loc[131390],
dd.query('th == 8' ).loc[262155],
dd.query('th == 8' ).loc[131624],
dd.query('th == 8' ).loc[131638],
dd.query('th == 8' ).loc[131360],
dd.query('th == 8' ).loc[131858],
dd.query('th == 8' ).loc[131614],
]).compute().set_index('id')
)
"""
"\nddd.update(\n dask.dataframe.concat([\n # List changes\n dd.query('th == 9' ).loc[131640],\n dd.query('th == 11').loc[262173],\n dd.query('th == 9' ).loc[546],\n dd.query('th == 8' ).loc[131615],\n dd.query('th == 8' ).loc[131625],\n dd.query('th == 8' ).loc[262172],\n dd.query('th == 9' ).loc[802],\n dd.query('th == 8' ).loc[131390],\n dd.query('th == 8' ).loc[262155],\n dd.query('th == 8' ).loc[131624],\n dd.query('th == 8' ).loc[131638],\n dd.query('th == 8' ).loc[131360],\n dd.query('th == 8' ).loc[131858],\n dd.query('th == 8' ).loc[131614],\n ]).compute().set_index('id')\n)\n"
"""
ddd.update(
dask.dataframe.concat([
# List changes
#0
dd.query('th == 8').loc[39],
dd.query('th == 8').loc[53],
dd.query('th == 8').loc[31],
dd.query('th == 8').loc[37],
dd.query('th == 8').loc[34],
dd.query('th == 8').loc[ 5],
dd.query('th == 9').loc[ 8],
dd.query('th == 8').loc[30],
dd.query('th == 8').loc[24],
dd.query('th == 8').loc[21],
dd.query('th == 8').loc[19],
dd.query('th == 8').loc[ 7],
#1
dd.query('th == 8').loc[168],
dd.query('th == 8').loc[185],
#6
dd.query('th == 9').loc[774],
dd.query('th == 9').loc[769],
dd.query('th == 8').loc[796],
dd.query('th == 8').loc[810],
dd.query('th == 8').loc[801],
dd.query('th == 10').loc[802],
]).compute().set_index('id')
)
ddd["id"] = ddd.index
ddd.counts.sum()
"""
'\nddd.update(\n dask.dataframe.concat([\n # List changes\n #0\n dd.query(\'th == 8\').loc[39],\n dd.query(\'th == 8\').loc[53],\n dd.query(\'th == 8\').loc[31],\n dd.query(\'th == 8\').loc[37],\n dd.query(\'th == 8\').loc[34],\n dd.query(\'th == 8\').loc[ 5],\n dd.query(\'th == 9\').loc[ 8],\n dd.query(\'th == 8\').loc[30],\n dd.query(\'th == 8\').loc[24],\n dd.query(\'th == 8\').loc[21],\n dd.query(\'th == 8\').loc[19],\n dd.query(\'th == 8\').loc[ 7],\n #1\n dd.query(\'th == 8\').loc[168],\n dd.query(\'th == 8\').loc[185],\n #6\n dd.query(\'th == 9\').loc[774],\n dd.query(\'th == 9\').loc[769],\n dd.query(\'th == 8\').loc[796],\n dd.query(\'th == 8\').loc[810],\n dd.query(\'th == 8\').loc[801],\n dd.query(\'th == 10\').loc[802],\n ]).compute().set_index(\'id\')\n)\n\nddd["id"] = ddd.index\n\nddd.counts.sum()\n'
"""
entries_df = get_entry_from_binary4(get_file(th_dict[0]["path"]))
th_dict[0]["dd"] = entries_df.id\
.value_counts()\
.to_frame(name="counts")
th_dict[0]["dd"].counts /= th_dict[0]["time"]
th_dict[0]["dd"]["id"] = th_dict[0]["dd"].index
th_dict[0]["dd"]["crystal_id"] = th_dict[0]["dd"].id.map(deconvolute)
th_dict[0]["dd"]["iTED"] = th_dict[0]["dd"].crystal_id.map(lambda x: deconvolution_dict[x]["iTED"])
th_dict[0]["dd"]["crystal"] = th_dict[0]["dd"].crystal_id.map(lambda x: deconvolution_dict[x]["Crystal"])
th_dict[0]["dd"]["pixel"] = th_dict[0]["dd"].id.map(lambda x: x - deconvolution_dict[deconvolute(x)]["Offset"])
th_dict[0]["dd"]["coordinates"] = th_dict[0]["dd"].apply(
lambda entry: pixel_maps.query(f'crystal_id == {entry.crystal_id}')
.pixel_map
.item()
.stack()
.to_frame("pixel_id")
.query(f"pixel_id == {entry.pixel}")
.index
.item(),
axis=1,
)
th_dict[0]["dd"]['x'] = th_dict[0]["dd"].coordinates.map(lambda x: x[1])
th_dict[0]["dd"]['y'] = th_dict[0]["dd"].coordinates.map(lambda x: x[0])
"""
'\nentries_df = get_entry_from_binary4(get_file(th_dict[0]["path"]))\n\nth_dict[0]["dd"] = entries_df.id .value_counts() .to_frame(name="counts")\n\nth_dict[0]["dd"].counts /= th_dict[0]["time"]\nth_dict[0]["dd"]["id"] = th_dict[0]["dd"].index\n\nth_dict[0]["dd"]["crystal_id"] = th_dict[0]["dd"].id.map(deconvolute)\nth_dict[0]["dd"]["iTED"] = th_dict[0]["dd"].crystal_id.map(lambda x: deconvolution_dict[x]["iTED"])\nth_dict[0]["dd"]["crystal"] = th_dict[0]["dd"].crystal_id.map(lambda x: deconvolution_dict[x]["Crystal"])\nth_dict[0]["dd"]["pixel"] = th_dict[0]["dd"].id.map(lambda x: x - deconvolution_dict[deconvolute(x)]["Offset"])\nth_dict[0]["dd"]["coordinates"] = th_dict[0]["dd"].apply(\n lambda entry: pixel_maps.query(f\'crystal_id == {entry.crystal_id}\')\n .pixel_map\n .item()\n .stack()\n .to_frame("pixel_id")\n .query(f"pixel_id == {entry.pixel}")\n .index\n .item(),\n axis=1,\n)\n\nth_dict[0]["dd"][\'x\'] = th_dict[0]["dd"].coordinates.map(lambda x: x[1])\nth_dict[0]["dd"][\'y\'] = th_dict[0]["dd"].coordinates.map(lambda x: x[0])\n'